package com.zhairuihao.utils;

import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * 批处理工具
 */
public class BatchExec {
    /**
     * 线程池中线程最大允许数量
     */
    static final int MAX_THREAD_SIZE = 200;

    /**
     * 默认处理超时时间
     */
    static final int DEFAULT_EXECUTE_TIME_OUT_IN_SECONDS = 15;

    /**
     * 分批执行处理命令,并汇总返回处理结果
     *
     * @param command 实际处理任务
     * @param data    需要分批处理的数据
     * @param size    分批大小
     * @return
     */
    public static <T, K> List<K> execute(
            ListCommand<T, K> command, List<T> data, int size) {
        return execute(command, data, size, DEFAULT_EXECUTE_TIME_OUT_IN_SECONDS);
    }

    /**
     * 分批执行处理命令,并汇总返回处理结果
     *
     * @param command  实际处理任务
     * @param data     需要分批处理的数据
     * @param size     分批大小
     * @param executor 线程池
     * @return
     */
    public static <T, K> List<K> execute(
            ListCommand<T, K> command, List<T> data, int size, ExecutorService executor) {
        return execute(command, data, size, DEFAULT_EXECUTE_TIME_OUT_IN_SECONDS, executor);
    }

    /**
     * 分批执行处理命令,并汇总返回处理结果
     *
     * @param command                 实际处理任务
     * @param data                    需要分批处理的数据
     * @param size                    分批大小
     * @param executeTimeoutInSeconds 每个处理超时时间
     * @return
     */
    public static <T, K> List<K> execute(
            ListCommand<T, K> command, List<T> data, int size, int executeTimeoutInSeconds) {
        return execute(command, data, size, executeTimeoutInSeconds, null);
    }

    /**
     * 分批执行处理命令,并汇总返回处理结果
     *
     * @param command                 实际处理任务
     * @param data                    需要分批处理的数据
     * @param size                    分批大小
     * @param executeTimeoutInSeconds 每个处理超时时间
     * @param executor                线程池
     * @return
     */
    public static <T, K> List<K> execute(
            ListCommand<T, K> command,
            List<T> data,
            int size,
            int executeTimeoutInSeconds,
            ExecutorService executor) {
        List<List<T>> splitDataList = split(data, size);
        List<Callable<List<K>>> tasks =
                splitDataList.stream()
                        .<Callable<List<K>>>map(splitData -> () -> command.execute(splitData))
                        .collect(Collectors.toList());
        try {
            List<K> resultList = new ArrayList<K>();
            int threadSize = Math.min(splitDataList.size(), MAX_THREAD_SIZE);
            executeTimeoutInSeconds =
                    executeTimeoutInSeconds <= 0
                            ? DEFAULT_EXECUTE_TIME_OUT_IN_SECONDS
                            : executeTimeoutInSeconds;
            execute(
                    tasks,
                    executor == null ? Executors.newFixedThreadPool(threadSize) : executor,
                    executeTimeoutInSeconds,
                    executor == null)
                    .forEach(resultList::addAll);
            return resultList;
        } catch (Exception e) {
            return new ArrayList<>();
        }
    }

    /**
     * 分批执行处理命令,无返回结果
     *
     * @param command 实际处理任务
     * @param data    需要分批处理的数据
     * @param size    分批大小
     */
    public static <T> void execute(VoidCommand<T> command, List<T> data, int size) {
        execute(command, data, size, DEFAULT_EXECUTE_TIME_OUT_IN_SECONDS);
    }

    /**
     * 分批执行处理命令,无返回结果
     *
     * @param command  实际处理任务
     * @param data     需要分批处理的数据
     * @param size     分批大小
     * @param executor 线程池
     */
    public static <T> void execute(
            VoidCommand<T> command, List<T> data, int size, ExecutorService executor) {
        execute(command, data, size, DEFAULT_EXECUTE_TIME_OUT_IN_SECONDS, executor);
    }

    /**
     * 分批执行处理命令,无返回结果
     *
     * @param command 实际处理任务
     * @param data    需要分批处理的数据
     * @param size    分批大小
     */
    public static <T> boolean execute(
            VoidCommand<T> command, List<T> data, int size, long executeTimeoutInSeconds) {
        return execute(command, data, size, executeTimeoutInSeconds, null);
    }

    /**
     * 分批执行处理命令,无返回结果
     *
     * @param command  实际处理任务
     * @param data     需要分批处理的数据
     * @param size     分批大小
     * @param executor 线程池
     */
    public static <T> boolean execute(
            VoidCommand<T> command,
            List<T> data,
            int size,
            long executeTimeoutInSeconds,
            ExecutorService executor) {
        List<List<T>> splitDataList = split(data, size);
        List<Callable<Integer>> tasks = new ArrayList<>();
        for (List<T> splitData : splitDataList) {
            Callable<Integer> integerCallable =
                    () -> {
                        command.execute(splitData);
                        return 1;
                    };
            tasks.add(integerCallable);
        }
        try {
            executeTimeoutInSeconds =
                    executeTimeoutInSeconds <= 0
                            ? DEFAULT_EXECUTE_TIME_OUT_IN_SECONDS
                            : executeTimeoutInSeconds;
            int threadSize =
                    splitDataList.size() > MAX_THREAD_SIZE ? MAX_THREAD_SIZE : splitDataList.size();
            execute(
                    tasks,
                    executor == null ? Executors.newFixedThreadPool(threadSize) : executor,
                    executeTimeoutInSeconds,
                    executor == null);
        } catch (Exception e) {
            return false;
        }
        return true;
    }

    /**
     * 异步执行任务
     */
    public static <T> List<T> execute(
            Collection<? extends Callable<T>> tasks,
            ExecutorService executor,
            long executeTimeout,
            boolean shutdownExecutor)
            throws Exception {
        try {
            return executor.invokeAll(tasks).stream()
                    .map(
                            future -> {
                                try {
                                    return future.get(executeTimeout, TimeUnit.SECONDS);
                                } catch (Exception e) {
                                    return null;
                                }
                            })
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());
        } finally {
            if (shutdownExecutor) {
                executor.shutdown();
            }
        }
    }

    /**
     * 将列表拆分成固定大小的子列表集合
     */
    public static <T> List<List<T>> split(List<T> data, int size) {
        List<List<T>> resultList = new ArrayList<>();
        int part = data.size() / size;
        for (int i = 0; i < part; i++) {
            resultList.add(data.subList(i * size, (i + 1) * size));
        }
        if (data.size() % size > 0) {
            resultList.add(data.subList(part * size, data.size()));
        }
        return resultList;
    }

    /**
     * 返回List处理结果
     */
    @FunctionalInterface
    public interface ListCommand<T, K> {
        List<K> execute(List<T> data);
    }

    /**
     * 无返回批量处理
     */
    @FunctionalInterface
    public interface VoidCommand<T> {
        void execute(List<T> data);
    }

    public static void main(String args[]) {
        List<Integer> execute = BatchExec.execute(list -> {
                    System.out.println(" 分片后的大小: " + list.size());
                    // do something
                    return list;
                },
                Arrays.asList(1, 2, 3, 4, 5, 6), 2
        );

        System.out.println(" execute " + execute);
        BatchExec.execute(list -> {
                    System.out.println(" 分片后的大小: " + list.size());
                    // do something
                },
                Arrays.asList(1, 2, 3, 4, 5, 6), 2
        );

    }
}